/*
 * Copyright 2014-2016 CyberVision, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.kaaproject.kaa.server.common.core.algorithms;

import static org.kaaproject.kaa.server.common.core.algorithms.CommonConstants.KAA_NAMESPACE;
import static org.kaaproject.kaa.server.common.core.algorithms.CommonConstants.UUID_FIELD;
import static org.kaaproject.kaa.server.common.core.algorithms.CommonConstants.UUID_SIZE;
import static org.kaaproject.kaa.server.common.core.algorithms.CommonConstants.UUID_TYPE;

import org.apache.avro.JsonProperties;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.node.ObjectNode;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.UUID;

public class AvroUtils {

  private AvroUtils() {
  }

  /**
   * Generates UUID bytes.
   *
   * @return list of generated bytes.
   */
  public static byte[] generateUuidBytes() {
    UUID uuid = UUID.randomUUID();

    ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[UUID_SIZE]);
    byteBuffer.putLong(uuid.getMostSignificantBits());
    byteBuffer.putLong(uuid.getLeastSignificantBits());

    return byteBuffer.array();
  }

  public static GenericData.Fixed generateUuidObject() {
    Schema avroSchema = Schema.createFixed(UUID_TYPE, null, KAA_NAMESPACE, UUID_SIZE);
    return new GenericData.Fixed(avroSchema, generateUuidBytes());
  }

  /**
   * Returns schema from container (union or schema itself) by its type.
   *
   * @param container schema container
   * @param type      schema type
   * @return          schema with a specified type
   */
  public static Schema getSchemaByType(Schema container, Schema.Type type) {
    if (container.getType().equals(type)) {
      return container;
    }
    if (container.getType().equals(Type.UNION)) {
      List<Schema> types = container.getTypes();
      if (types != null) {
        for (Schema typeIter : types) {
          if (typeIter.getType().equals(type)) {
            return typeIter;
          }
        }
      }
    }
    return null;
  }

  /**
   * Returns whether schema is complex.
   *
   * @param schema schema
   * @return       true if schema is complex otherwise false
   */
  public static boolean isComplexSchema(Schema schema) {
    switch (schema.getType()) {
      case RECORD:
      case ARRAY:
      case MAP:
      case FIXED:
      case ENUM:
        return true;
      default:
        return false;
    }
  }

  /**
   * Copies json properties.
   *
   * @param src source
   * @param dst destination
   */
  public static void copyJsonProperties(JsonProperties src, JsonProperties dst) {
    for (Map.Entry<String, JsonNode> prop : src.getJsonProps().entrySet()) {
      dst.addProp(prop.getKey(), prop.getValue());
    }
  }

  public static String injectUuids(String json, Schema schema) throws IOException {
    return injectUuids(json.getBytes(), schema);
  }

  /**
   * Inject UUID.
   *
   * @param content the uuid
   * @param schema the schema
   * @return json content tree
   */
  public static String injectUuids(byte[] content, Schema schema) throws IOException {
    ObjectMapper objectMapper = new ObjectMapper();
    JsonNode root = objectMapper.readTree(content);
    injectUuidsFromJsonNodes(root, schema);
    return root.toString();
  }

  public static String injectUuids(JsonNode root, Schema schema) {
    return injectUuidsFromJsonNodes(root, schema).toString();
  }

  private static JsonNode injectUuidsFromJsonNodes(JsonNode json, Schema schema) {
    if (json == null) {
      return json;
    }

    switch (schema.getType()) {
      case RECORD:
        schema.getFields().stream()
            .filter(f -> !f.name().equals(UUID_FIELD))
            .forEach(f -> injectUuidsFromJsonNodes(json.get(f.name()), f.schema()));

        boolean addressable = schema.getFields().stream().filter(f -> f.name().equals(
                UUID_FIELD)).findFirst().isPresent();
        if (addressable) {
          ((ObjectNode) json).put(UUID_FIELD, (Integer) null);
        }
        break;
      case UNION:
        schema.getTypes()
            .forEach(s -> injectUuidsFromJsonNodes(json.get(s.getName()), s));
        break;
      case ARRAY:
        json.getElements().forEachRemaining((elem) -> injectUuids(elem, schema.getElementType()));
        break;
      default:
        return json;
    }

    return json;
  }

  /**
   * Removes UUIDs from a json tree.
   *
   * @param json json tree node
   */
  public static void removeUuids(JsonNode json) {
    boolean containerWithId = json.isContainerNode() && json.has(UUID_FIELD);
    boolean isArray = json.isArray();
    boolean childIsNotArray = !(json.size() == 1 && json.getElements().next().isArray());

    if (containerWithId && !isArray && childIsNotArray) {
      ((ObjectNode) json).remove(UUID_FIELD);
    }

    for (JsonNode node : json) {
      if (node.isContainerNode()) {
        removeUuids(node);
      }
    }
  }
}
